KubernetesExecutor: self.completed adoption set is never drained#68674
Conversation
FrankYang0529
left a comment
There was a problem hiding this comment.
Leave some minor comments. You can link issue to #68683.
|
@ihorlukianov Could you check the CI error? Thank you. |
|
Independently validated this fix on a live Astro KubernetesExecutor deployment (Airflow 3.2.2, Method
To trigger it deterministically:
Measured Results (identical trigger, only the executor code differs)
Without the fix, a single adopted pod was re-patched up to 21 times and the count was still climbing ( LGTM — the Drafted-by: Claude Code (Opus 4.8); reviewed by @seanmuth before posting |
…che#68674) * Fix Kubernetes Executor pods deletion storm * Used dict for better performance; Added UT for delete_worker_pods=False * Fix formatting
Solves #68683
Airflow version observed 3.2.1
KubernetesExecutor.sync()re-runs_change_state()over the entireself.completedset, and nothing ever removes entries from that set.Iteration over the
self.completedis nested inside the result-queuewhile TrueWith
delete_worker_pods=False, the data structure only grows, with no removals. So every adopted completed pod is re-PATCHed forever, and the set grows monotonically over the scheduler's lifetime.With
delete_worker_pods=True, the same happens with pod deletion.The same pod name is deleted many times within seconds:
This starves the scheduler loop: with (in my case ~1,855) scheduled TIs waiting, most of each loop is spent re-deleting finished pods instead of launching new ones.
Expected behaviour
Each finished worker pod should be patched or deleted once. Subsequent scheduler loops should not re-issue calls for pods already processed.
Reproduce
Deploy Airflow 3.2.x with
KubernetesExecutor.Trigger DAGs with many mapped tasks (e.g. 100+ mapped
triggertasks) so pods complete in quick succession.Inspect scheduler logs:
Observe the same pod names with delete counts >> 1.
RCA
Introduced/regressed around #55797 (Oct 2025), which added a
self.completedset for orphaned completed pod adoption.In
KubernetesExecutor.sync()(providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py):self.completedis processed inside thewhile Trueloop that drainsresult_queue— for every completion event, all entries inself.completedcall_change_state()again, each triggeringdelete_pod():self.completedis never cleared after processing, so entries accumulate and are re-processed on every subsequent completion event.Expected delete volume ≈
num_result_queue_events × (1 + len(self.completed))Proposed fix
for result in self.completedoutside the result-queue drain loop (once persync()).self.completedafter successful processing.pod_namewhen adopting completed pods.Expected impact
Before:
delete_calls ≈ num_result_events × (1 + len(completed))persync().After:
delete_calls ≈ num_result_events + len(completed)persync(), withcompletedcleared after processing.Was generative AI tooling used to co-author this PR?
Generated-by: [Cursor Composer ] following the guidelines